Skip to content

Commit

Permalink
feat(core): run commands directly
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongemi committed Mar 12, 2024
1 parent 9520aa2 commit 754ddf0
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 93 deletions.
170 changes: 111 additions & 59 deletions packages/nx/src/executors/run-commands/run-commands.impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ export interface RunCommandsOptions extends Json {
args?: string | string[];
envFile?: string;
__unparsed__: string[];
usePty?: boolean;
streamOutput?: boolean;
}

const propKeys = [
Expand All @@ -64,7 +66,8 @@ const propKeys = [
'envFile',
'__unparsed__',
'env',
'mode',
'usePty',
'streamOutput',
'verbose',
];

Expand All @@ -87,6 +90,7 @@ export default async function (
context: ExecutorContext
): Promise<{
success: boolean;
terminalOutput: string;
}> {
await loadEnvVars(options.envFile);
const normalized = normalizeOptions(options);
Expand All @@ -107,10 +111,10 @@ export default async function (
}

try {
const success = options.parallel
const result = options.parallel
? await runInParallel(normalized, context)
: await runSerially(normalized, context);
return { success };
return result;
} catch (e) {
if (process.env.NX_VERBOSE_LOGGING === 'true') {
console.error(e);
Expand All @@ -124,43 +128,67 @@ export default async function (
async function runInParallel(
options: NormalizedRunCommandsOptions,
context: ExecutorContext
) {
): Promise<{ success: boolean; terminalOutput: string }> {
const procs = options.commands.map((c) =>
createProcess(
c,
options.readyWhen,
options.color,
calculateCwd(options.cwd, context),
options.env ?? {},
true
).then((result) => ({
true,
options.usePty,
options.streamOutput
).then((result: { success: boolean; terminalOutput: string }) => ({
result,
command: c.command,
}))
);

let terminalOutput = '';
if (options.readyWhen) {
const r = await Promise.race(procs);
if (!r.result) {
process.stderr.write(
`Warning: command "${r.command}" exited with non-zero status code`
);
return false;
const r: {
result: { success: boolean; terminalOutput: string };
command: string;
} = await Promise.race(procs);
terminalOutput += r.result.terminalOutput;
if (!r.result.success) {
const output = `Warning: command "${r.command}" exited with non-zero status code`;
terminalOutput += output;
if (options.streamOutput) {
process.stderr.write(output);
}
return { success: false, terminalOutput };
} else {
return true;
return { success: true, terminalOutput };
}
} else {
const r = await Promise.all(procs);
const failed = r.filter((v) => !v.result);
const r: {
result: { success: boolean; terminalOutput: string };
command: string;
}[] = await Promise.all(procs);
terminalOutput += r.map((f) => f.result.terminalOutput).join('');
const failed = r.filter((v) => !v.result.success);
if (failed.length > 0) {
failed.forEach((f) => {
process.stderr.write(
`Warning: command "${f.command}" exited with non-zero status code`
);
});
return false;
const output = failed
.map(
(f) =>
`Warning: command "${f.command}" exited with non-zero status code`
)
.join('\r\n');
terminalOutput += output;
if (options.streamOutput) {
process.stderr.write(output);
}
return {
success: false,
terminalOutput,
};
} else {
return true;
return {
success: true,
terminalOutput,
};
}
}
}
Expand Down Expand Up @@ -213,25 +241,31 @@ function normalizeOptions(
async function runSerially(
options: NormalizedRunCommandsOptions,
context: ExecutorContext
) {
): Promise<{ success: boolean; terminalOutput: string }> {
let terminalOutput = '';
for (const c of options.commands) {
const success = await createProcess(
c,
undefined,
options.color,
calculateCwd(options.cwd, context),
options.env ?? {},
false
);
if (!success) {
process.stderr.write(
`Warning: command "${c.command}" exited with non-zero status code`
const result: { success: boolean; terminalOutput: string } =
await createProcess(
c,
undefined,
options.color,
calculateCwd(options.cwd, context),
options.env ?? {},
false,
options.usePty,
options.streamOutput
);
return false;
terminalOutput += result.terminalOutput;
if (!result.success) {
const output = `Warning: command "${c.command}" exited with non-zero status code`;
result.terminalOutput += output;
if (options.streamOutput) {
process.stderr.write(output);
}
return { success: false, terminalOutput };
}
}

return true;
return { success: true, terminalOutput };
}

async function createProcess(
Expand All @@ -245,41 +279,44 @@ async function createProcess(
color: boolean,
cwd: string,
env: Record<string, string>,
isParallel: boolean
): Promise<boolean> {
isParallel: boolean,
usePty: boolean = true,
streamOutput: boolean = true
): Promise<{ success: boolean; terminalOutput: string }> {
env = processEnv(color, cwd, env);
// The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes
// currently does not work properly in windows
if (
process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' &&
process.stdout.isTTY &&
!commandConfig.prefix &&
!isParallel
!isParallel &&
usePty
) {
const cp = new PseudoTtyProcess(
runCommand(commandConfig.command, cwd, env)
runCommand(commandConfig.command, cwd, env, !streamOutput)
);

let terminalOutput = '';
return new Promise((res) => {
cp.onOutput((output) => {
terminalOutput += output;
if (readyWhen && output.indexOf(readyWhen) > -1) {
res(true);
res({ success: true, terminalOutput });
}
});

cp.onExit((code) => {
if (code === 0) {
res(true);
} else if (code >= 128) {
if (code >= 128) {
process.exit(code);
} else {
res(false);
res({ success: code === 0, terminalOutput });
}
});
});
}

return nodeProcess(commandConfig, color, cwd, env, readyWhen);
return nodeProcess(commandConfig, cwd, env, readyWhen, streamOutput);
}

function nodeProcess(
Expand All @@ -289,11 +326,12 @@ function nodeProcess(
bgColor?: string;
prefix?: string;
},
color: boolean,
cwd: string,
env: Record<string, string>,
readyWhen: string
): Promise<boolean> {
readyWhen: string,
streamOutput = true
): Promise<{ success: boolean; terminalOutput: string }> {
let terminalOutput = '';
return new Promise((res) => {
const childProcess = exec(commandConfig.command, {
maxBuffer: LARGE_BUFFER,
Expand All @@ -312,24 +350,36 @@ function nodeProcess(
process.on('SIGQUIT', processExitListener);

childProcess.stdout.on('data', (data) => {
process.stdout.write(addColorAndPrefix(data, commandConfig));
const output = addColorAndPrefix(data, commandConfig);
terminalOutput += output;
if (streamOutput) {
process.stdout.write(output);
}
if (readyWhen && data.toString().indexOf(readyWhen) > -1) {
res(true);
res({ success: true, terminalOutput });
}
});
childProcess.stderr.on('data', (err) => {
process.stderr.write(addColorAndPrefix(err, commandConfig));
const output = addColorAndPrefix(err, commandConfig);
terminalOutput += output;
if (streamOutput) {
process.stderr.write(output);
}
if (readyWhen && err.toString().indexOf(readyWhen) > -1) {
res(true);
res({ success: true, terminalOutput });
}
});
childProcess.on('error', (err) => {
process.stderr.write(addColorAndPrefix(err.toString(), commandConfig));
res(false);
const ouptput = addColorAndPrefix(err.toString(), commandConfig);
terminalOutput += ouptput;
if (streamOutput) {
process.stderr.write(ouptput);
}
res({ success: false, terminalOutput });
});
childProcess.on('exit', (code) => {
if (!readyWhen) {
res(code === 0);
res({ success: code === 0, terminalOutput });
}
});
});
Expand Down Expand Up @@ -370,11 +420,13 @@ function calculateCwd(
}

function processEnv(color: boolean, cwd: string, env: Record<string, string>) {
const localEnv = appendLocalEnv({ cwd: cwd ?? process.cwd() });
const res = {
...process.env,
...appendLocalEnv({ cwd: cwd ?? process.cwd() }),
...localEnv,
...env,
};
res.PATH = localEnv.PATH; // need to override PATH to make sure we are using the local node_modules

if (color) {
res.FORCE_COLOR = `${color}`;
Expand All @@ -389,7 +441,7 @@ export function interpolateArgsIntoCommand(
'args' | 'parsedArgs' | '__unparsed__' | 'unknownOptions'
>,
forwardAllArgs: boolean
) {
): string {
if (command.indexOf('{args.') > -1) {
const regex = /{args\.([^}]+)}/g;
return command.replace(regex, (_, group: string) =>
Expand Down
4 changes: 2 additions & 2 deletions packages/nx/src/hasher/hash-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export async function hashTasksThatDoNotDependOnOutputsOfOtherTasks(
const tasks = Object.values(taskGraph.tasks);
const tasksWithHashers = await Promise.all(
tasks.map(async (task) => {
const customHasher = await getCustomHasher(task, projectGraph);
const customHasher = getCustomHasher(task, projectGraph);
return { task, customHasher };
})
);
Expand Down Expand Up @@ -56,7 +56,7 @@ export async function hashTask(
env: NodeJS.ProcessEnv
) {
performance.mark('hashSingleTask:start');
const customHasher = await getCustomHasher(task, projectGraph);
const customHasher = getCustomHasher(task, projectGraph);
const projectsConfigurations =
readProjectsConfigurationFromProjectGraph(projectGraph);
const { value, details } = await (customHasher
Expand Down
1 change: 0 additions & 1 deletion packages/nx/src/tasks-runner/default-tasks-runner.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { TasksRunner, TaskStatus } from './tasks-runner';
import { TaskOrchestrator } from './task-orchestrator';
import { performance } from 'perf_hooks';
import { TaskHasher } from '../hasher/task-hasher';
import { LifeCycle } from './life-cycle';
import { ProjectGraph } from '../config/project-graph';
Expand Down
4 changes: 2 additions & 2 deletions packages/nx/src/tasks-runner/forked-process-task-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import {
} from './batch/batch-messages';
import { stripIndents } from '../utils/strip-indents';
import { Task, TaskGraph } from '../config/task-graph';
import { Readable, Transform } from 'stream';
import { ChildProcess as NativeChildProcess, nxFork } from '../native';
import { Transform } from 'stream';
import { nxFork } from '../native';
import { PsuedoIPCServer } from './psuedo-ipc';
import { FORKED_PROCESS_OS_SOCKET_PATH } from '../daemon/socket-utils';
import { PseudoTtyProcess } from '../utils/child-process';
Expand Down

0 comments on commit 754ddf0

Please sign in to comment.