Skip to content

Commit

Permalink
Send options to worker process over IPC
Browse files Browse the repository at this point in the history
In large projects, the options may be too big to be passed through the
process arguments. Fixes #2032.
  • Loading branch information
novemberborn committed Feb 5, 2019
1 parent 010914b commit 3078892
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 105 deletions.
7 changes: 6 additions & 1 deletion lib/fork.js
Expand Up @@ -50,7 +50,7 @@ module.exports = (file, opts, execArgv) => {
}
}, opts);

const args = [JSON.stringify(opts), opts.color ? '--color' : '--no-color'].concat(opts.workerArgv);
const args = [opts.color ? '--color' : '--no-color'].concat(opts.workerArgv);

const subprocess = childProcess.fork(workerPath, args, {
cwd: opts.projectDir,
Expand Down Expand Up @@ -85,6 +85,11 @@ module.exports = (file, opts, execArgv) => {
return;
}

if (message.ava.type === 'ready-for-options') {
send({type: 'options', options: opts});
return;
}

if (message.ava.type === 'ping') {
send({type: 'pong'});
} else {
Expand Down
4 changes: 1 addition & 3 deletions lib/worker/consume-argv.js
@@ -1,5 +1,3 @@
'use strict';
require('./options').set(JSON.parse(process.argv[2]));

// Remove arguments received from fork.js and leave those specified by the user.
process.argv.splice(2, 2);
process.argv.splice(2, 1);
4 changes: 4 additions & 0 deletions lib/worker/ipc.js
Expand Up @@ -12,6 +12,9 @@ process.on('message', message => {
}

switch (message.ava.type) {
case 'options':
emitter.emit('options', message.ava.options);
break;
case 'peer-failed':
emitter.emit('peerFailed');
break;
Expand All @@ -23,6 +26,7 @@ process.on('message', message => {
}
});

exports.options = emitter.once('options');
exports.peerFailed = emitter.once('peerFailed');

function send(evt) {
Expand Down
204 changes: 108 additions & 96 deletions lib/worker/subprocess.js
Expand Up @@ -5,126 +5,138 @@ const currentlyUnhandled = require('currently-unhandled')();
require('./ensure-forked');
require('./load-chalk');
require('./consume-argv');
require('./fake-tty');
/* eslint-enable import/no-unassigned-import */

const nowAndTimers = require('../now-and-timers');
const Runner = require('../runner');
const serializeError = require('../serialize-error');
const dependencyTracking = require('./dependency-tracker');
const ipc = require('./ipc');
const options = require('./options').get();
const precompilerHook = require('./precompiler-hook');

function exit(code) {
if (!process.exitCode) {
process.exitCode = code;
}
ipc.send({type: 'ready-for-options'});
ipc.options.then(options => {
require('./options').set(options);
require('./fake-tty'); // eslint-disable-line import/no-unassigned-import

dependencyTracking.flush();
return ipc.flush().then(() => process.exit()); // eslint-disable-line unicorn/no-process-exit
}

const runner = new Runner({
failFast: options.failFast,
failWithoutAssertions: options.failWithoutAssertions,
file: options.file,
match: options.match,
projectDir: options.projectDir,
runOnlyExclusive: options.runOnlyExclusive,
serial: options.serial,
snapshotDir: options.snapshotDir,
updateSnapshots: options.updateSnapshots
});
const nowAndTimers = require('../now-and-timers');
const Runner = require('../runner');
const serializeError = require('../serialize-error');
const dependencyTracking = require('./dependency-tracker');
const precompilerHook = require('./precompiler-hook');

ipc.peerFailed.then(() => {
runner.interrupt();
});
function exit(code) {
if (!process.exitCode) {
process.exitCode = code;
}

const attributedRejections = new Set();
process.on('unhandledRejection', (reason, promise) => {
if (runner.attributeLeakedError(reason)) {
attributedRejections.add(promise);
dependencyTracking.flush();
return ipc.flush().then(() => process.exit()); // eslint-disable-line unicorn/no-process-exit
}
});

runner.on('dependency', dependencyTracking.track);
runner.on('stateChange', state => ipc.send(state));
const runner = new Runner({
failFast: options.failFast,
failWithoutAssertions: options.failWithoutAssertions,
file: options.file,
match: options.match,
projectDir: options.projectDir,
runOnlyExclusive: options.runOnlyExclusive,
serial: options.serial,
snapshotDir: options.snapshotDir,
updateSnapshots: options.updateSnapshots
});

runner.on('error', error => {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error)});
exit(1);
});
ipc.peerFailed.then(() => {
runner.interrupt();
});

runner.on('finish', () => {
try {
const touchedFiles = runner.saveSnapshotState();
if (touchedFiles) {
ipc.send({type: 'touched-files', files: touchedFiles});
const attributedRejections = new Set();
process.on('unhandledRejection', (reason, promise) => {
if (runner.attributeLeakedError(reason)) {
attributedRejections.add(promise);
}
} catch (error) {
});

runner.on('dependency', dependencyTracking.track);
runner.on('stateChange', state => ipc.send(state));

runner.on('error', error => {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error)});
exit(1);
return;
}
});

nowAndTimers.setImmediate(() => {
currentlyUnhandled()
.filter(rejection => !attributedRejections.has(rejection.promise))
.forEach(rejection => {
ipc.send({type: 'unhandled-rejection', err: serializeError('Unhandled rejection', true, rejection.reason)});
});
runner.on('finish', () => {
try {
const touchedFiles = runner.saveSnapshotState();
if (touchedFiles) {
ipc.send({type: 'touched-files', files: touchedFiles});
}
} catch (error) {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error)});
exit(1);
return;
}

exit(0);
});
});
nowAndTimers.setImmediate(() => {
currentlyUnhandled()
.filter(rejection => !attributedRejections.has(rejection.promise))
.forEach(rejection => {
ipc.send({type: 'unhandled-rejection', err: serializeError('Unhandled rejection', true, rejection.reason)});
});

process.on('uncaughtException', error => {
if (runner.attributeLeakedError(error)) {
return;
}
exit(0);
});
});

ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
});
process.on('uncaughtException', error => {
if (runner.attributeLeakedError(error)) {
return;
}

let accessedRunner = false;
exports.getRunner = () => {
accessedRunner = true;
return runner;
};
ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
});

// Store value in case to prevent required modules from modifying it.
const testPath = options.file;
let accessedRunner = false;
exports.getRunner = () => {
accessedRunner = true;
return runner;
};

// Install before processing options.require, so if helpers are added to the
// require configuration the *compiled* helper will be loaded.
dependencyTracking.install(testPath);
precompilerHook.install();
// Store value in case to prevent required modules from modifying it.
const testPath = options.file;

try {
for (const mod of (options.require || [])) {
const required = require(mod);
// Install before processing options.require, so if helpers are added to the
// require configuration the *compiled* helper will be loaded.
dependencyTracking.install(testPath);
precompilerHook.install();

try {
if (required[Symbol.for('esm\u200D:package')]) {
require = required(module); // eslint-disable-line no-global-assign
}
} catch (_) {}
}
try {
for (const mod of (options.require || [])) {
const required = require(mod);

try {
if (required[Symbol.for('esm\u200D:package')]) {
require = required(module); // eslint-disable-line no-global-assign
}
} catch (_) {}
}

require(testPath);
require(testPath);

if (accessedRunner) {
// Unreference the IPC channel if the test file required AVA. This stops it
// from keeping the event loop busy, which means the `beforeExit` event can be
// used to detect when tests stall.
ipc.unref();
} else {
ipc.send({type: 'missing-ava-import'});
if (accessedRunner) {
// Unreference the IPC channel if the test file required AVA. This stops it
// from keeping the event loop busy, which means the `beforeExit` event can be
// used to detect when tests stall.
ipc.unref();
} else {
ipc.send({type: 'missing-ava-import'});
exit(1);
}
} catch (error) {
ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
}
} catch (error) {
ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
}
}).catch(error => {
// There shouldn't be any errors, but if there are we may not have managed
// to bootstrap enough code to serialize them. Re-throw and let the process
// crash.
setImmediate(() => {
throw error;
});
});
8 changes: 3 additions & 5 deletions profile.js
Expand Up @@ -116,7 +116,9 @@ runStatus.observeWorker({
process.send = data => {
if (data && data.ava) {
const evt = data.ava;
if (evt.type === 'ping') {
if (evt.type === 'ready-for-options') {
process.emit('message', {ava: {type: 'options', options: opts}});
} else if (evt.type === 'ping') {
if (console.profileEnd) {
console.profileEnd();
}
Expand Down Expand Up @@ -152,10 +154,6 @@ process.on('beforeExit', () => {
process.exitCode = process.exitCode || runStatus.suggestExitCode({matching: false});
});

// The "subprocess" will read process.argv[2] for options
process.argv[2] = JSON.stringify(opts);
process.argv.length = 3;

if (console.profile) {
console.profile('AVA test-worker process');
}
Expand Down

0 comments on commit 3078892

Please sign in to comment.