diff --git a/src/Bundler.js b/src/Bundler.js index d27d9ed850f..f6bbd204c86 100644 --- a/src/Bundler.js +++ b/src/Bundler.js @@ -357,11 +357,7 @@ class Bundler extends EventEmitter { this.farm = WorkerFarm.getShared(this.options); } - stop() { - if (this.farm) { - this.farm.end(); - } - + async stop() { if (this.watcher) { this.watcher.stop(); } @@ -369,6 +365,12 @@ class Bundler extends EventEmitter { if (this.hmr) { this.hmr.stop(); } + + // Watcher and hmr can cause workerfarm calls + // keep this as last to prevent unwanted errors + if (this.farm) { + await this.farm.end(); + } } async getAsset(name, parent) { diff --git a/src/Logger.js b/src/Logger.js index c575bf79108..eb9d4942aad 100644 --- a/src/Logger.js +++ b/src/Logger.js @@ -5,6 +5,7 @@ const emoji = require('./utils/emoji'); const {countBreaks} = require('grapheme-breaker'); const stripAnsi = require('strip-ansi'); const ora = require('ora'); +const WorkerFarm = require('./workerfarm/WorkerFarm'); class Logger { constructor(options) { @@ -192,12 +193,11 @@ function stringWidth(string) { // If we are in a worker, make a proxy class which will // send the logger calls to the main process via IPC. // These are handled in WorkerFarm and directed to handleMessage above. -if (process.send && process.env.PARCEL_WORKER_TYPE === 'remote-worker') { - const worker = require('./worker'); +if (WorkerFarm.isWorker()) { class LoggerProxy {} for (let method of Object.getOwnPropertyNames(Logger.prototype)) { LoggerProxy.prototype[method] = (...args) => { - worker.addCall( + WorkerFarm.callMaster( { location: __filename, method, diff --git a/src/utils/localRequire.js b/src/utils/localRequire.js index 64f22a0c986..a7e2f1a3fb0 100644 --- a/src/utils/localRequire.js +++ b/src/utils/localRequire.js @@ -1,7 +1,7 @@ const {dirname} = require('path'); const promisify = require('../utils/promisify'); const resolve = promisify(require('resolve')); -const worker = require('../worker'); +const WorkerFarm = require('../workerfarm/WorkerFarm'); const cache = new Map(); @@ -14,7 +14,7 @@ async function localRequire(name, path, triedInstall = false) { resolved = await resolve(name, {basedir}).then(([name]) => name); } catch (e) { if (e.code === 'MODULE_NOT_FOUND' && !triedInstall) { - await worker.addCall({ + await WorkerFarm.callMaster({ location: require.resolve('./installPackage.js'), args: [[name], path] }); diff --git a/src/worker.js b/src/worker.js index cb607f58951..c33d51cb63a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -1,19 +1,11 @@ require('v8-compile-cache'); const Pipeline = require('./Pipeline'); -const WorkerFarm = require('./workerfarm/WorkerFarm'); let pipeline; -let child; - -function setChildReference(childReference) { - child = childReference; -} function init(options) { pipeline = new Pipeline(options || {}); - Object.assign(process.env, options.env || {}, { - PARCEL_WORKER_TYPE: child ? 'remote-worker' : 'local-worker' - }); + Object.assign(process.env, options.env || {}); process.env.HMR_PORT = options.hmrPort; process.env.HMR_HOSTNAME = options.hmrHostname; } @@ -27,16 +19,5 @@ async function run(path, id, isWarmUp) { } } -// request.location is a module path relative to src or lib -async function addCall(request, awaitResponse = true) { - if (process.send && process.env.PARCEL_WORKER_TYPE === 'remote-worker') { - return child.addCall(request, awaitResponse); - } else { - return WorkerFarm.getShared().processRequest(request); - } -} - exports.init = init; exports.run = run; -exports.addCall = addCall; -exports.setChildReference = setChildReference; diff --git a/src/workerfarm/Worker.js b/src/workerfarm/Worker.js index 4543e226fab..8c9524984ae 100644 --- a/src/workerfarm/Worker.js +++ b/src/workerfarm/Worker.js @@ -10,7 +10,7 @@ const childModule = let WORKER_ID = 0; class Worker extends EventEmitter { - constructor(forkModule, options) { + constructor(options) { super(); this.options = options; @@ -22,12 +22,13 @@ class Worker extends EventEmitter { this.calls = new Map(); this.exitCode = null; this.callId = 0; - this.stopped = false; - this.fork(forkModule); + this.ready = false; + this.stopped = false; + this.isStopping = false; } - fork(forkModule) { + async fork(forkModule, bundlerOptions) { let filteredArgs = process.execArgv.filter( v => !/^--(debug|inspect)/.test(v) ); @@ -39,11 +40,6 @@ class Worker extends EventEmitter { }; this.child = childProcess.fork(childModule, process.argv, options); - this.send({ - type: 'module', - module: forkModule, - child: this.id - }); this.child.on('message', this.receive.bind(this)); @@ -55,6 +51,36 @@ class Worker extends EventEmitter { this.child.on('error', err => { this.emit('error', err); }); + + await new Promise((resolve, reject) => { + this.call({ + method: 'childInit', + args: [forkModule], + retries: 0, + resolve, + reject + }); + }); + + await this.init(bundlerOptions); + } + + async init(bundlerOptions) { + this.ready = false; + + return new Promise((resolve, reject) => { + this.call({ + method: 'init', + args: [bundlerOptions], + retries: 0, + resolve: (...args) => { + this.ready = true; + this.emit('ready'); + resolve(...args); + }, + reject + }); + }); } send(data) { @@ -84,6 +110,10 @@ class Worker extends EventEmitter { } call(call) { + if (this.stopped || this.isStopping) { + return; + } + let idx = this.callId++; this.calls.set(idx, call); @@ -97,7 +127,7 @@ class Worker extends EventEmitter { } receive(data) { - if (this.stopped) { + if (this.stopped || this.isStopping) { return; } @@ -126,15 +156,24 @@ class Worker extends EventEmitter { } } - stop() { - this.stopped = true; + async stop() { + if (!this.stopped) { + this.stopped = true; + + if (this.child) { + this.child.send('die'); - this.send('die'); - setTimeout(() => { - if (this.exitCode === null) { - this.child.kill('SIGKILL'); + let forceKill = setTimeout( + () => this.child.kill('SIGINT'), + this.options.forcedKillTime + ); + await new Promise(resolve => { + this.child.once('exit', resolve); + }); + + clearTimeout(forceKill); } - }, this.options.forcedKillTime); + } } } diff --git a/src/workerfarm/WorkerFarm.js b/src/workerfarm/WorkerFarm.js index ff0ec72a2a4..316c9942450 100644 --- a/src/workerfarm/WorkerFarm.js +++ b/src/workerfarm/WorkerFarm.js @@ -11,7 +11,7 @@ class WorkerFarm extends EventEmitter { { maxConcurrentWorkers: WorkerFarm.getNumWorkers(), maxConcurrentCallsPerWorker: WorkerFarm.getConcurrentCallsPerWorker(), - forcedKillTime: 100, + forcedKillTime: 500, warmWorkers: true, useLocalWorker: true, workerPath: '../worker' @@ -19,9 +19,8 @@ class WorkerFarm extends EventEmitter { farmOptions ); - this.started = false; this.warmWorkers = 0; - this.children = new Map(); + this.workers = new Map(); this.callQueue = []; this.localWorker = require(this.options.workerPath); @@ -31,24 +30,35 @@ class WorkerFarm extends EventEmitter { } warmupWorker(method, args) { - // Workers have started, but are not warmed up yet. + // Workers are already stopping + if (this.ending) { + return; + } + + // Workers are not warmed up yet. // Send the job to a remote worker in the background, // but use the result from the local worker - it will be faster. - if (this.started) { - let promise = this.addCall(method, [...args, true]); - if (promise) { - promise - .then(() => { - this.warmWorkers++; - if (this.warmWorkers >= this.children.size) { - this.emit('warmedup'); - } - }) - .catch(() => {}); - } + let promise = this.addCall(method, [...args, true]); + if (promise) { + promise + .then(() => { + this.warmWorkers++; + if (this.warmWorkers >= this.workers.size) { + this.emit('warmedup'); + } + }) + .catch(() => {}); } } + shouldStartRemoteWorkers() { + return ( + this.options.maxConcurrentWorkers > 1 || + process.env.NODE_ENV === 'test' || + !this.options.useLocalWorker + ); + } + mkhandle(method) { return function(...args) { // Child process workers are slow to start (~600ms). @@ -57,7 +67,7 @@ class WorkerFarm extends EventEmitter { if (this.shouldUseRemoteWorkers()) { return this.addCall(method, [...args, false]); } else { - if (this.options.warmWorkers) { + if (this.options.warmWorkers && this.shouldStartRemoteWorkers()) { this.warmupWorker(method, args); } @@ -66,82 +76,74 @@ class WorkerFarm extends EventEmitter { }.bind(this); } - onError(error, childId) { + onError(error, worker) { // Handle ipc errors if (error.code === 'ERR_IPC_CHANNEL_CLOSED') { - return this.stopChild(childId); + return this.stopWorker(worker); } } - onExit(childId) { - // delay this to give any sends a chance to finish - setTimeout(() => { - let doQueue = false; - let child = this.children.get(childId); - if (child && child.calls.size) { - for (let call of child.calls.values()) { - call.retries++; - this.callQueue.unshift(call); - doQueue = true; - } - } - this.stopChild(childId); - if (doQueue) { - this.processQueue(); - } - }, 10); - } - startChild() { - let worker = new Worker(this.options.workerPath, this.options); + let worker = new Worker(this.options); - worker.on('request', data => { - this.processRequest(data, worker); - }); + worker.fork(this.options.workerPath, this.bundlerOptions); - worker.on('response', () => { - // allow any outstanding calls to be processed - this.processQueue(); - }); + worker.on('request', data => this.processRequest(data, worker)); - worker.once('exit', () => { - this.onExit(worker.id); - }); + worker.on('ready', () => this.processQueue()); + worker.on('response', () => this.processQueue()); - worker.on('error', err => { - this.onError(err, worker.id); - }); + worker.on('error', err => this.onError(err, worker)); + worker.once('exit', () => this.stopWorker(worker)); - this.children.set(worker.id, worker); + this.workers.set(worker.id, worker); } - stopChild(childId) { - let child = this.children.get(childId); - if (child) { - child.stop(); - this.children.delete(childId); + async stopWorker(worker) { + if (!worker.stopped) { + this.workers.delete(worker.id); + + worker.isStopping = true; + + if (worker.calls.size) { + for (let call of worker.calls.values()) { + call.retries++; + this.callQueue.unshift(call); + } + } + + worker.calls = null; + + await worker.stop(); + + // Process any requests that failed and start a new worker + this.processQueue(); } } async processQueue() { if (this.ending || !this.callQueue.length) return; - if (this.children.size < this.options.maxConcurrentWorkers) { + if (this.workers.size < this.options.maxConcurrentWorkers) { this.startChild(); } - for (let child of this.children.values()) { + for (let worker of this.workers.values()) { if (!this.callQueue.length) { break; } - if (child.calls.size < this.options.maxConcurrentCallsPerWorker) { - child.call(this.callQueue.shift()); + if (!worker.ready || worker.stopped || worker.isStopping) { + continue; + } + + if (worker.calls.size < this.options.maxConcurrentCallsPerWorker) { + worker.call(this.callQueue.shift()); } } } - async processRequest(data, child = false) { + async processRequest(data, worker = false) { let result = { idx: data.idx, type: 'response' @@ -170,8 +172,8 @@ class WorkerFarm extends EventEmitter { } if (awaitResponse) { - if (child) { - child.send(result); + if (worker) { + worker.send(result); } else { return result; } @@ -179,7 +181,9 @@ class WorkerFarm extends EventEmitter { } addCall(method, args) { - if (this.ending) return; // don't add anything new to the queue + if (this.ending) { + throw new Error('Cannot add a worker call if workerfarm is ending.'); + } return new Promise((resolve, reject) => { this.callQueue.push({ @@ -195,59 +199,47 @@ class WorkerFarm extends EventEmitter { async end() { this.ending = true; - for (let childId of this.children.keys()) { - this.stopChild(childId); - } + await Promise.all( + Array.from(this.workers.values()).map(worker => this.stopWorker(worker)) + ); this.ending = false; shared = null; } - init(options) { - this.localWorker.init(options); - this.initRemoteWorkers(options); - } - - async initRemoteWorkers(options) { - this.started = false; - this.warmWorkers = 0; + init(bundlerOptions) { + this.bundlerOptions = bundlerOptions; - // Start workers if there isn't enough workers already - for ( - let i = this.children.size; - i < this.options.maxConcurrentWorkers; - i++ - ) { - this.startChild(); + if (this.shouldStartRemoteWorkers()) { + this.persistBundlerOptions(); } - // Reliable way of initialising workers - let promises = []; - for (let child of this.children.values()) { - promises.push( - new Promise((resolve, reject) => { - child.call({ - method: 'init', - args: [options], - retries: 0, - resolve, - reject - }); - }) - ); + this.localWorker.init(bundlerOptions); + this.startMaxWorkers(); + } + + persistBundlerOptions() { + for (let worker of this.workers.values()) { + worker.init(this.bundlerOptions); } + } - await Promise.all(promises); - if (this.options.maxConcurrentWorkers > 0) { - this.started = true; - this.emit('started'); + startMaxWorkers() { + // Starts workers untill the maximum is reached + if (this.workers.size < this.options.maxConcurrentWorkers) { + for ( + let i = 0; + i < this.options.maxConcurrentWorkers - this.workers.size; + i++ + ) { + this.startChild(); + } } } shouldUseRemoteWorkers() { return ( !this.options.useLocalWorker || - (this.started && - (this.warmWorkers >= this.children.size || !this.options.warmWorkers)) + (this.warmWorkers >= this.workers.size || !this.options.warmWorkers) ); } @@ -258,6 +250,10 @@ class WorkerFarm extends EventEmitter { shared.init(options); } + if (!shared && !options) { + throw new Error('Workerfarm should be initialised using options'); + } + return shared; } @@ -275,8 +271,21 @@ class WorkerFarm extends EventEmitter { return cores || 1; } + static callMaster(request, awaitResponse = true) { + if (WorkerFarm.isWorker()) { + const child = require('./child'); + return child.addCall(request, awaitResponse); + } else { + return WorkerFarm.getShared().processRequest(request); + } + } + + static isWorker() { + return process.send && require.main.filename === require.resolve('./child'); + } + static getConcurrentCallsPerWorker() { - return parseInt(process.env.PARCEL_MAX_CONCURRENT_CALLS) || 10; + return parseInt(process.env.PARCEL_MAX_CONCURRENT_CALLS) || 5; } } diff --git a/src/workerfarm/child.js b/src/workerfarm/child.js index 69c4a16181c..9483779ac95 100644 --- a/src/workerfarm/child.js +++ b/src/workerfarm/child.js @@ -2,6 +2,10 @@ const errorUtils = require('./errorUtils'); class Child { constructor() { + if (!process.send) { + throw new Error('Only create Child instances in a worker!'); + } + this.module = undefined; this.childId = undefined; @@ -9,8 +13,6 @@ class Child { this.responseQueue = new Map(); this.responseId = 0; this.maxConcurrentCalls = 10; - - process.env.PARCEL_WORKER_TYPE = 'remote-worker'; } messageListener(data) { @@ -18,15 +20,6 @@ class Child { return this.end(); } - if (data.type === 'module' && data.module && !this.module) { - this.module = require(data.module); - this.childId = data.child; - if (this.module.setChildReference) { - this.module.setChildReference(this); - } - return; - } - let type = data.type; if (type === 'response') { return this.handleResponse(data); @@ -47,6 +40,11 @@ class Child { }); } + childInit(module, childId) { + this.module = require(module); + this.childId = childId; + } + async handleRequest(data) { let idx = data.idx; let child = data.child; @@ -56,7 +54,11 @@ class Child { let result = {idx, child, type: 'response'}; try { result.contentType = 'data'; - result.content = await this.module[method](...args); + if (method === 'childInit') { + result.content = this.childInit(...args, child); + } else { + result.content = await this.module[method](...args); + } } catch (e) { result.contentType = 'error'; result.content = errorUtils.errorToJson(e); @@ -132,7 +134,7 @@ class Child { } end() { - return process.exit(0); + process.exit(); } } diff --git a/test/hmr.js b/test/hmr.js index 69ff4d77bfb..776aae39cf0 100644 --- a/test/hmr.js +++ b/test/hmr.js @@ -13,26 +13,24 @@ describe('hmr', function() { await rimraf(__dirname + '/input'); }); - afterEach(function(done) { + afterEach(async function() { stub.restore(); - let finalise = () => { + let finalise = async () => { if (b) { - b.stop(); + await b.stop(); b = null; - - done(); } }; if (ws) { ws.close(); - ws.onclose = () => { - ws = null; - finalise(); - }; - } else { - finalise(); + await new Promise(resolve => { + ws.onclose = resolve; + }); + ws = null; } + + await finalise(); }); function nextEvent(emitter, event) { diff --git a/test/integration/plugins/node_modules/parcel-plugin-test/TextAsset.js b/test/integration/plugins/node_modules/parcel-plugin-test/TextAsset.js index 111f65dbb4e..23cb69037c6 100644 --- a/test/integration/plugins/node_modules/parcel-plugin-test/TextAsset.js +++ b/test/integration/plugins/node_modules/parcel-plugin-test/TextAsset.js @@ -1,3 +1,8 @@ +// Hacky fix to make node 6 tests pass +if (parseInt(process.versions.node, 10) < 8) { + require("babel-register"); +} + const {Asset} = require('../../../../../src/Bundler'); class TextAsset extends Asset { diff --git a/test/integration/workerfarm/ipc-pid.js b/test/integration/workerfarm/ipc-pid.js index 43df6a98dc2..3508c2eef79 100644 --- a/test/integration/workerfarm/ipc-pid.js +++ b/test/integration/workerfarm/ipc-pid.js @@ -1,14 +1,9 @@ -let options = {}; -let child; - -function setChildReference(childRef) { - child = childRef; -} +const WorkerFarm = require(`../../../${parseInt(process.versions.node, 10) < 8 ? 'lib' : 'src'}/workerfarm/WorkerFarm`); function run() { let result = [process.pid]; return new Promise((resolve, reject) => { - child.addCall({ + WorkerFarm.callMaster({ location: require.resolve('./master-process-id.js'), args: [] }).then((pid) => { @@ -23,5 +18,4 @@ function init() { } exports.run = run; -exports.init = init; -exports.setChildReference = setChildReference; \ No newline at end of file +exports.init = init; \ No newline at end of file diff --git a/test/integration/workerfarm/ipc.js b/test/integration/workerfarm/ipc.js index 57041cad4fc..bbaff78edcd 100644 --- a/test/integration/workerfarm/ipc.js +++ b/test/integration/workerfarm/ipc.js @@ -1,16 +1,9 @@ -let options = {}; -let child; - -function setChildReference(childRef) { - child = childRef; -} +const WorkerFarm = require(`../../../${parseInt(process.versions.node, 10) < 8 ? 'lib' : 'src'}/workerfarm/WorkerFarm`); function run(a, b) { - return new Promise((resolve, reject) => { - child.addCall({ - location: require.resolve('./master-sum.js'), - args: [a, b] - }).then(resolve).catch(reject); + return WorkerFarm.callMaster({ + location: require.resolve('./master-sum.js'), + args: [a, b] }); } @@ -19,5 +12,4 @@ function init() { } exports.run = run; -exports.init = init; -exports.setChildReference = setChildReference; \ No newline at end of file +exports.init = init; \ No newline at end of file diff --git a/test/workerfarm.js b/test/workerfarm.js index 001249b0219..2e4086d0a90 100644 --- a/test/workerfarm.js +++ b/test/workerfarm.js @@ -12,8 +12,6 @@ describe('WorkerFarm', () => { } ); - await new Promise(resolve => workerfarm.once('started', resolve)); - assert.equal(await workerfarm.run(), 'pong'); await workerfarm.end(); @@ -29,8 +27,6 @@ describe('WorkerFarm', () => { } ); - await new Promise(resolve => workerfarm.once('started', resolve)); - let promises = []; for (let i = 0; i < 1000; i++) { promises.push(workerfarm.run(i)); @@ -54,8 +50,8 @@ describe('WorkerFarm', () => { for (let i = 0; i < 100; i++) { options.key = i; workerfarm.init(options); - await new Promise(resolve => workerfarm.once('started', resolve)); - for (let i = 0; i < workerfarm.children.size; i++) { + + for (let i = 0; i < workerfarm.workers.size; i++) { assert.equal((await workerfarm.run()).key, options.key); } assert.equal(workerfarm.shouldUseRemoteWorkers(), true); @@ -74,17 +70,15 @@ describe('WorkerFarm', () => { } ); - await new Promise(resolve => workerfarm.once('started', resolve)); - for (let i = 0; i < 100; i++) { assert.equal(await workerfarm.run(i), i); } await new Promise(resolve => workerfarm.once('warmedup', resolve)); - assert(workerfarm.children.size > 0, 'Should have spawned workers.'); + assert(workerfarm.workers.size > 0, 'Should have spawned workers.'); assert( - workerfarm.warmWorkers >= workerfarm.children.size, + workerfarm.warmWorkers >= workerfarm.workers.size, 'Should have warmed up workers.' ); @@ -117,8 +111,6 @@ describe('WorkerFarm', () => { } ); - await new Promise(resolve => workerfarm.once('started', resolve)); - assert.equal(await workerfarm.run(1, 2), 3); await workerfarm.end(); @@ -134,8 +126,6 @@ describe('WorkerFarm', () => { } ); - await new Promise(resolve => workerfarm.once('started', resolve)); - for (let i = 0; i < 1000; i++) { assert.equal(await workerfarm.run(1 + i, 2), 3 + i); } @@ -153,8 +143,6 @@ describe('WorkerFarm', () => { } ); - await new Promise(resolve => workerfarm.once('started', resolve)); - let result = await workerfarm.run(); assert.equal(result.length, 2); assert.equal(result[1], process.pid); @@ -174,8 +162,6 @@ describe('WorkerFarm', () => { } ); - await new Promise(resolve => workerfarm.once('started', resolve)); - let bigData = []; for (let i = 0; i < 10000; i++) { bigData.push('This is some big data');