Skip to content

Commit

Permalink
Workerfarm improvements (#1587)
Browse files Browse the repository at this point in the history
  • Loading branch information
DeMoorJasper authored and devongovett committed Jul 9, 2018
1 parent a42dfeb commit 7b38f4f
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 216 deletions.
12 changes: 7 additions & 5 deletions src/Bundler.js
Expand Up @@ -357,18 +357,20 @@ class Bundler extends EventEmitter {
this.farm = WorkerFarm.getShared(this.options);
}

stop() {
if (this.farm) {
this.farm.end();
}

async stop() {
if (this.watcher) {
this.watcher.stop();
}

if (this.hmr) {
this.hmr.stop();
}

// Watcher and hmr can cause workerfarm calls
// keep this as last to prevent unwanted errors
if (this.farm) {
await this.farm.end();
}
}

async getAsset(name, parent) {
Expand Down
6 changes: 3 additions & 3 deletions src/Logger.js
Expand Up @@ -5,6 +5,7 @@ const emoji = require('./utils/emoji');
const {countBreaks} = require('grapheme-breaker');
const stripAnsi = require('strip-ansi');
const ora = require('ora');
const WorkerFarm = require('./workerfarm/WorkerFarm');

class Logger {
constructor(options) {
Expand Down Expand Up @@ -192,12 +193,11 @@ function stringWidth(string) {
// If we are in a worker, make a proxy class which will
// send the logger calls to the main process via IPC.
// These are handled in WorkerFarm and directed to handleMessage above.
if (process.send && process.env.PARCEL_WORKER_TYPE === 'remote-worker') {
const worker = require('./worker');
if (WorkerFarm.isWorker()) {
class LoggerProxy {}
for (let method of Object.getOwnPropertyNames(Logger.prototype)) {
LoggerProxy.prototype[method] = (...args) => {
worker.addCall(
WorkerFarm.callMaster(
{
location: __filename,
method,
Expand Down
4 changes: 2 additions & 2 deletions src/utils/localRequire.js
@@ -1,7 +1,7 @@
const {dirname} = require('path');
const promisify = require('../utils/promisify');
const resolve = promisify(require('resolve'));
const worker = require('../worker');
const WorkerFarm = require('../workerfarm/WorkerFarm');

const cache = new Map();

Expand All @@ -14,7 +14,7 @@ async function localRequire(name, path, triedInstall = false) {
resolved = await resolve(name, {basedir}).then(([name]) => name);
} catch (e) {
if (e.code === 'MODULE_NOT_FOUND' && !triedInstall) {
await worker.addCall({
await WorkerFarm.callMaster({
location: require.resolve('./installPackage.js'),
args: [[name], path]
});
Expand Down
21 changes: 1 addition & 20 deletions src/worker.js
@@ -1,19 +1,11 @@
require('v8-compile-cache');
const Pipeline = require('./Pipeline');
const WorkerFarm = require('./workerfarm/WorkerFarm');

let pipeline;
let child;

function setChildReference(childReference) {
child = childReference;
}

function init(options) {
pipeline = new Pipeline(options || {});
Object.assign(process.env, options.env || {}, {
PARCEL_WORKER_TYPE: child ? 'remote-worker' : 'local-worker'
});
Object.assign(process.env, options.env || {});
process.env.HMR_PORT = options.hmrPort;
process.env.HMR_HOSTNAME = options.hmrHostname;
}
Expand All @@ -27,16 +19,5 @@ async function run(path, id, isWarmUp) {
}
}

// request.location is a module path relative to src or lib
async function addCall(request, awaitResponse = true) {
if (process.send && process.env.PARCEL_WORKER_TYPE === 'remote-worker') {
return child.addCall(request, awaitResponse);
} else {
return WorkerFarm.getShared().processRequest(request);
}
}

exports.init = init;
exports.run = run;
exports.addCall = addCall;
exports.setChildReference = setChildReference;
73 changes: 56 additions & 17 deletions src/workerfarm/Worker.js
Expand Up @@ -10,7 +10,7 @@ const childModule =
let WORKER_ID = 0;

class Worker extends EventEmitter {
constructor(forkModule, options) {
constructor(options) {
super();

this.options = options;
Expand All @@ -22,12 +22,13 @@ class Worker extends EventEmitter {
this.calls = new Map();
this.exitCode = null;
this.callId = 0;
this.stopped = false;

this.fork(forkModule);
this.ready = false;
this.stopped = false;
this.isStopping = false;
}

fork(forkModule) {
async fork(forkModule, bundlerOptions) {
let filteredArgs = process.execArgv.filter(
v => !/^--(debug|inspect)/.test(v)
);
Expand All @@ -39,11 +40,6 @@ class Worker extends EventEmitter {
};

this.child = childProcess.fork(childModule, process.argv, options);
this.send({
type: 'module',
module: forkModule,
child: this.id
});

this.child.on('message', this.receive.bind(this));

Expand All @@ -55,6 +51,36 @@ class Worker extends EventEmitter {
this.child.on('error', err => {
this.emit('error', err);
});

await new Promise((resolve, reject) => {
this.call({
method: 'childInit',
args: [forkModule],
retries: 0,
resolve,
reject
});
});

await this.init(bundlerOptions);
}

async init(bundlerOptions) {
this.ready = false;

return new Promise((resolve, reject) => {
this.call({
method: 'init',
args: [bundlerOptions],
retries: 0,
resolve: (...args) => {
this.ready = true;
this.emit('ready');
resolve(...args);
},
reject
});
});
}

send(data) {
Expand Down Expand Up @@ -84,6 +110,10 @@ class Worker extends EventEmitter {
}

call(call) {
if (this.stopped || this.isStopping) {
return;
}

let idx = this.callId++;
this.calls.set(idx, call);

Expand All @@ -97,7 +127,7 @@ class Worker extends EventEmitter {
}

receive(data) {
if (this.stopped) {
if (this.stopped || this.isStopping) {
return;
}

Expand Down Expand Up @@ -126,15 +156,24 @@ class Worker extends EventEmitter {
}
}

stop() {
this.stopped = true;
async stop() {
if (!this.stopped) {
this.stopped = true;

if (this.child) {
this.child.send('die');

this.send('die');
setTimeout(() => {
if (this.exitCode === null) {
this.child.kill('SIGKILL');
let forceKill = setTimeout(
() => this.child.kill('SIGINT'),
this.options.forcedKillTime
);
await new Promise(resolve => {
this.child.once('exit', resolve);
});

clearTimeout(forceKill);
}
}, this.options.forcedKillTime);
}
}
}

Expand Down

0 comments on commit 7b38f4f

Please sign in to comment.