Skip to content

Commit

Permalink
Improve .pipe()
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Feb 18, 2024
1 parent 28acdbc commit 2475212
Show file tree
Hide file tree
Showing 23 changed files with 1,506 additions and 250 deletions.
10 changes: 6 additions & 4 deletions docs/scripts.md
Original file line number Diff line number Diff line change
Expand Up @@ -588,17 +588,19 @@ console.log('example');
```sh
# Bash
echo example | cat
echo npm run build | sort | head -n2
```
```js
// zx
await $`echo example | cat`;
await $`npm run build | sort | head -n2`;
```
```js
// Execa
await $`echo example`.pipe($({stdin: 'pipe'})`cat`);
await $`npm run build`
.pipe($({stdin: 'pipe'})`sort`)
.pipe($({stdin: 'pipe'})`head -n2`);
```
### Piping stdout and stderr to another command
Expand All @@ -619,7 +621,7 @@ await Promise.all([echo, cat]);
```js
// Execa
await $({all: true})`echo example`.pipe($({stdin: 'pipe'})`cat`, 'all');
await $({all: true})`echo example`.pipe($({from: 'all', stdin: 'pipe'})`cat`);
```
### Piping stdout to a file
Expand Down
49 changes: 36 additions & 13 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {type ChildProcess} from 'node:child_process';
import {type Readable, type Writable} from 'node:stream';

type IfAsync<IsSync extends boolean, AsyncValue> = IsSync extends true ? never : AsyncValue;
type IfAsync<IsSync extends boolean, AsyncValue, SyncValue = never> = IsSync extends true ? SyncValue : AsyncValue;

type NoOutputStdioOption =
| 'ignore'
Expand Down Expand Up @@ -732,6 +732,13 @@ type ExecaCommonReturnValue<IsSync extends boolean = boolean, OptionsType extend
This is an array if the `lines` option is `true, or if either the `stdout` or `stderr` option is a transform in object mode.
*/
all: IfAsync<IsSync, AllOutput<OptionsType>>;

/**
Results of the other processes that were piped into this child process. This is useful to inspect a series of child processes piped with each other.
This array is initially empty and is populated each time the `.pipe()` method resolves.
*/
pipedFrom: IfAsync<IsSync, ExecaReturnValue[], []>;
// Workaround for a TypeScript bug: https://github.com/microsoft/TypeScript/issues/57062
} & {};

Expand Down Expand Up @@ -795,6 +802,33 @@ type AllIfStderr<StderrResultIgnored extends boolean> = StderrResultIgnored exte
? undefined
: Readable;

type PipeOptions = {
/**
Which stream to pipe. A file descriptor number can also be passed.
`"all"` pipes both `stdout` and `stderr`. This requires the `all` option to be `true`.
*/
readonly from?: 'stdout' | 'stderr' | 'all' | number;

/**
Unpipe the child process when the signal aborts.
The `.pipe()` method will be rejected with a cancellation error.
*/
readonly signal?: AbortSignal;
};

type PipableProcess = {
/**
[Pipe](https://nodejs.org/api/stream.html#readablepipedestination-options) the child process' `stdout` to a second Execa child process' `stdin`. This resolves with that second process' result. If either process is rejected, this is rejected with either process' error instead.
This can be called multiple times to chain a series of processes.
Multiple child processes can be piped to the same process. Conversely, the same child process can be piped to multiple other processes.
*/
pipe<Destination extends ExecaChildProcess>(destination: Destination, options?: PipeOptions): Promise<Awaited<Destination>> & PipableProcess;
};

export type ExecaChildPromise<OptionsType extends Options = Options> = {
stdout: StreamUnlessIgnored<'1', OptionsType>;

Expand All @@ -813,17 +847,6 @@ export type ExecaChildPromise<OptionsType extends Options = Options> = {
onRejected?: (reason: ExecaError<OptionsType>) => ResultType | PromiseLike<ResultType>
): Promise<ExecaReturnValue<OptionsType> | ResultType>;

/**
[Pipe](https://nodejs.org/api/stream.html#readablepipedestination-options) the child process' `stdout` to another Execa child process' `stdin`.
A `streamName` can be passed to pipe `"stderr"`, `"all"` (both `stdout` and `stderr`) or any another file descriptor instead of `stdout`.
`childProcess.stdout` (and/or `childProcess.stderr` depending on `streamName`) must not be `undefined`. When `streamName` is `"all"`, the `all` option must be set to `true`.
Returns `execaChildProcess`, which allows chaining `.pipe()` then `await`ing the final result.
*/
pipe<Target extends ExecaChildProcess>(target: Target, streamName?: 'stdout' | 'stderr' | 'all' | number): Target;

/**
Sends a [signal](https://nodejs.org/api/os.html#signal-constants) to the child process. The default signal is the `killSignal` option. `killSignal` defaults to `SIGTERM`, which terminates the child process.
Expand All @@ -834,7 +857,7 @@ export type ExecaChildPromise<OptionsType extends Options = Options> = {
[More info.](https://nodejs.org/api/child_process.html#subprocesskillsignal)
*/
kill(signalOrError: Parameters<ChildProcess['kill']>[0] | Error): ReturnType<ChildProcess['kill']>;
};
} & PipableProcess;

export type ExecaChildProcess<OptionsType extends Options = Options> = ChildProcess &
ExecaChildPromise<OptionsType> &
Expand Down
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {handleNodeOption} from './lib/node.js';
import {handleInputAsync, pipeOutputAsync, cleanupStdioStreams} from './lib/stdio/async.js';
import {handleInputSync, pipeOutputSync} from './lib/stdio/sync.js';
import {spawnedKill, validateTimeout, normalizeForceKillAfterDelay, cleanupOnExit, isFailedExit} from './lib/kill.js';
import {pipeToProcess} from './lib/pipe.js';
import {pipeToProcess} from './lib/pipe/setup.js';
import {getSpawnedResult, makeAllStream} from './lib/stream.js';
import {mergePromise} from './lib/promise.js';
import {joinCommand} from './lib/escape.js';
Expand Down
35 changes: 23 additions & 12 deletions index.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import * as process from 'node:process';
import {Readable, Writable} from 'node:stream';
import {createWriteStream} from 'node:fs';
import {expectType, expectError, expectAssignable, expectNotAssignable} from 'tsd';
import {expectType, expectNotType, expectError, expectAssignable, expectNotAssignable} from 'tsd';
import {
$,
execa,
Expand Down Expand Up @@ -78,26 +78,34 @@ const asyncFinal = async function * () {

try {
const execaPromise = execa('unicorns', {all: true});
const unicornsResult = await execaPromise;

const execaBufferPromise = execa('unicorns', {encoding: 'buffer', all: true});
const writeStream = createWriteStream('output.txt');
const bufferResult = await execaBufferPromise;

expectType<typeof execaPromise>(execaBufferPromise.pipe(execaPromise));
expectError(execaBufferPromise.pipe(writeStream));
expectError(execaBufferPromise.pipe('output.txt'));
await execaBufferPromise.pipe(execaPromise, 'stdout');
await execaBufferPromise.pipe(execaPromise, 'stderr');
await execaBufferPromise.pipe(execaPromise, 'all');
await execaBufferPromise.pipe(execaPromise, 3);
expectError(execaBufferPromise.pipe(execaPromise, 'other'));
expectType<typeof bufferResult>(await execaPromise.pipe(execaBufferPromise));
expectNotType<typeof bufferResult>(await execaPromise.pipe(execaPromise));
expectType<typeof bufferResult>(await execaPromise.pipe(execaPromise).pipe(execaBufferPromise));
await execaPromise.pipe(execaPromise).pipe(execaBufferPromise, {from: 'stdout'});
expectError(execaPromise.pipe(execaBufferPromise).stdout);
expectError(execaPromise.pipe(createWriteStream('output.txt')));
expectError(execaPromise.pipe('output.txt'));
await execaPromise.pipe(execaBufferPromise, {});
expectError(execaPromise.pipe(execaBufferPromise, 'stdout'));
await execaPromise.pipe(execaBufferPromise, {from: 'stdout'});
await execaPromise.pipe(execaBufferPromise, {from: 'stderr'});
await execaPromise.pipe(execaBufferPromise, {from: 'all'});
await execaPromise.pipe(execaBufferPromise, {from: 3});
expectError(execaPromise.pipe(execaBufferPromise, {from: 'other'}));
await execaPromise.pipe(execaBufferPromise, {signal: new AbortController().signal});
expectError(await execaPromise.pipe(execaBufferPromise, {signal: true}));

expectType<Readable>(execaPromise.all);
const noAllPromise = execa('unicorns');
expectType<undefined>(noAllPromise.all);
const noAllResult = await noAllPromise;
expectType<undefined>(noAllResult.all);

const unicornsResult = await execaPromise;
expectType<string>(unicornsResult.command);
expectType<string>(unicornsResult.escapedCommand);
expectType<number | undefined>(unicornsResult.exitCode);
Expand All @@ -108,6 +116,7 @@ try {
expectType<string | undefined>(unicornsResult.signal);
expectType<string | undefined>(unicornsResult.signalDescription);
expectType<string>(unicornsResult.cwd);
expectType<ExecaReturnValue[]>(unicornsResult.pipedFrom);

expectType<undefined>(unicornsResult.stdio[0]);
expectType<string>(unicornsResult.stdout);
Expand All @@ -120,7 +129,6 @@ try {
expectType<Readable>(execaBufferPromise.stdout);
expectType<Readable>(execaBufferPromise.stderr);
expectType<Readable>(execaBufferPromise.all);
const bufferResult = await execaBufferPromise;
expectType<Uint8Array>(bufferResult.stdout);
expectType<Uint8Array>(bufferResult.stdio[1]);
expectType<Uint8Array>(bufferResult.stderr);
Expand Down Expand Up @@ -405,6 +413,7 @@ try {
expectType<string>(execaError.cwd);
expectType<string>(execaError.shortMessage);
expectType<string | undefined>(execaError.originalMessage);
expectType<ExecaReturnValue[]>(execaError.pipedFrom);

expectType<undefined>(execaError.stdio[0]);

Expand Down Expand Up @@ -550,6 +559,7 @@ try {
expectType<string | undefined>(unicornsResult.signal);
expectType<string | undefined>(unicornsResult.signalDescription);
expectType<string>(unicornsResult.cwd);
expectType<[]>(unicornsResult.pipedFrom);

expectType<undefined>(unicornsResult.stdio[0]);
expectType<string>(unicornsResult.stdout);
Expand Down Expand Up @@ -621,6 +631,7 @@ try {
expectType<string>(execaError.cwd);
expectType<string>(execaError.shortMessage);
expectType<string | undefined>(execaError.originalMessage);
expectType<[]>(execaError.pipedFrom);

expectType<undefined>(execaError.stdio[0]);

Expand Down
3 changes: 3 additions & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ export const makeError = ({
delete error.bufferedData;
}

error.pipedFrom = [];

return error;
};

Expand Down Expand Up @@ -185,4 +187,5 @@ export const makeSuccessResult = ({
stderr: stdio[2],
all,
stdio,
pipedFrom: [],
});
95 changes: 0 additions & 95 deletions lib/pipe.js

This file was deleted.

13 changes: 13 additions & 0 deletions lib/pipe/abort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {aborted} from 'node:util';
import {createNonCommandError} from './validate.js';

export const unpipeOnAbort = (signal, ...args) => signal === undefined
? []
: [unpipeOnSignalAbort(signal, ...args)];

const unpipeOnSignalAbort = async (signal, sourceStream, mergedStream, {stdioStreamsGroups, options}) => {
await aborted(signal, sourceStream);
await mergedStream.remove(sourceStream);
const error = new Error('Pipe cancelled by `signal` option.');
throw createNonCommandError({error, stdioStreamsGroups, options});
};
24 changes: 24 additions & 0 deletions lib/pipe/sequence.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Like Bash, we await both processes. This is unlike some other shells which only await the destination process.
// Like Bash with the `pipefail` option, if either process fails, the whole pipe fails.
// Like Bash, if both process fails, we return the failure of the destination.
// This ensures both processes' error is present, using `error.pipedFrom`.
export const waitForBothProcesses = async (source, destination) => {
const [
{status: sourceStatus, reason: sourceReason, value: sourceResult = sourceReason},
{status: destinationStatus, reason: destinationReason, value: destinationResult = destinationReason},
] = await Promise.allSettled([source, destination]);

if (!destinationResult.pipedFrom.includes(sourceResult)) {
destinationResult.pipedFrom.push(sourceResult);
}

if (destinationStatus === 'rejected') {
throw destinationResult;
}

if (sourceStatus === 'rejected') {
throw sourceResult;
}

return destinationResult;
};
25 changes: 25 additions & 0 deletions lib/pipe/setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import {normalizePipeArguments} from './validate.js';
import {waitForBothProcesses} from './sequence.js';
import {pipeProcessStream} from './streaming.js';
import {unpipeOnAbort} from './abort.js';

// Pipe a process' `stdout`/`stderr`/`stdio` into another process' `stdin`
export const pipeToProcess = (sourceInfo, destination, pipeOptions) => {
const promise = handlePipePromise(sourceInfo, destination, pipeOptions);
promise.pipe = pipeToProcess.bind(undefined, {...sourceInfo, spawned: destination});
return promise;
};

const handlePipePromise = async (sourceInfo, destination, {from, signal} = {}) => {
const {source, sourceStream, destinationStream} = normalizePipeArguments(destination, from, sourceInfo);
const maxListenersController = new AbortController();
try {
const mergedStream = pipeProcessStream(sourceStream, destinationStream, maxListenersController);
return await Promise.race([
waitForBothProcesses(source, destination),
...unpipeOnAbort(signal, sourceStream, mergedStream, sourceInfo),
]);
} finally {
maxListenersController.abort();
}
};

0 comments on commit 2475212

Please sign in to comment.