Skip to content

Commit

Permalink
fix(node): add workaround for handling sub processes in node:node exe…
Browse files Browse the repository at this point in the history
…cutor (#10292)

In Nx 15, we plan to clean up runExecutor and implement proper process handling in runExecutor
upstream. This is a workaround that fix some racing-condition in node:node exexecutor downstream

ISSUES CLOSED: #9305
  • Loading branch information
nartc committed May 17, 2022
1 parent 4400942 commit 7b9b0cd
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions packages/node/src/executors/node/node.impl.ts
Expand Up @@ -8,11 +8,14 @@ import {
} from '@nrwl/devkit';
import { calculateProjectDependencies } from '@nrwl/workspace/src/utilities/buildable-libs-utils';
import { ChildProcess, fork } from 'child_process';
import { HashingImpl } from 'nx/src/hasher/hashing-impl';
import * as treeKill from 'tree-kill';
import { promisify } from 'util';
import { InspectType, NodeExecutorOptions } from './schema';

let subProcess: ChildProcess = null;
const hasher = new HashingImpl();
const processMap = new Map<string, ChildProcess>();
const hashedMap = new Map<string[], string>();

export interface ExecutorEvent {
outfile: string;
Expand All @@ -24,15 +27,15 @@ export async function* nodeExecutor(
context: ExecutorContext
) {
process.on('SIGTERM', async () => {
await killCurrentProcess();
await killCurrentProcess(options, 'SIGTERM');
process.exit(128 + 15);
});
process.on('SIGINT', async () => {
await killCurrentProcess();
await killCurrentProcess(options, 'SIGINT');
process.exit(128 + 2);
});
process.on('SIGHUP', async () => {
await killCurrentProcess();
await killCurrentProcess(options, 'SIGHUP');
process.exit(128 + 1);
});

Expand Down Expand Up @@ -84,11 +87,16 @@ async function runProcess(
options: NodeExecutorOptions,
mappings: { [project: string]: string }
) {
subProcess = fork(
const execArgv = getExecArgv(options);

const hashed = hasher.hashArray(execArgv.concat(options.args));
hashedMap.set(options.args, hashed);

const subProcess = fork(
joinPathFragments(__dirname, 'node-with-require-overrides'),
options.args,
{
execArgv: getExecArgv(options),
execArgv,
stdio: 'inherit',
env: {
...process.env,
Expand All @@ -98,6 +106,8 @@ async function runProcess(
}
);

processMap.set(hashed, subProcess);

if (!options.watch) {
return new Promise<void>((resolve, reject) => {
subProcess.on('exit', (code) => {
Expand Down Expand Up @@ -136,7 +146,7 @@ async function handleBuildEvent(
) {
// Don't kill previous run unless new build is successful.
if (options.watch && event.success) {
await killCurrentProcess();
await killCurrentProcess(options);
}

if (event.success) {
Expand All @@ -147,11 +157,24 @@ async function handleBuildEvent(
const promisifiedTreeKill: (pid: number, signal: string) => Promise<void> =
promisify(treeKill);

async function killCurrentProcess() {
if (!subProcess) return;
async function killCurrentProcess(
options: NodeExecutorOptions,
signal: string = 'SIGTERM'
) {
const currentProcessKey = hashedMap.get(options.args);
if (!currentProcessKey) return;

const currentProcess = processMap.get(currentProcessKey);
if (!currentProcess) return;

try {
await promisifiedTreeKill(subProcess.pid, 'SIGTERM');
await promisifiedTreeKill(currentProcess.pid, signal);

// if the currentProcess.killed is false, invoke kill()
// to properly send the signal to the process
if (!currentProcess.killed) {
currentProcess.kill(signal as NodeJS.Signals);
}
} catch (err) {
if (Array.isArray(err) && err[0] && err[2]) {
const errorMessage = err[2];
Expand All @@ -160,7 +183,8 @@ async function killCurrentProcess() {
logger.error(err.message);
}
} finally {
subProcess = null;
processMap.delete(currentProcessKey);
hashedMap.delete(options.args);
}
}

Expand Down

0 comments on commit 7b9b0cd

Please sign in to comment.