Skip to content

Commit

Permalink
Replace node vm with worker_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
appurva21 committed Apr 19, 2024
1 parent e3ab7ea commit 56e8dae
Show file tree
Hide file tree
Showing 20 changed files with 196 additions and 548 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.yaml
@@ -1,4 +1,6 @@
unreleased:
new feature:
- Replaced Node VM with Worker threads
chores:
- Add GitHub CI and remove Travis

Expand Down
2 changes: 1 addition & 1 deletion README.md
@@ -1,6 +1,6 @@
# UVM [![CI](https://github.com/postmanlabs/uvm/actions/workflows/ci.yml/badge.svg?branch=develop)](https://github.com/postmanlabs/uvm/actions/workflows/ci.yml) [![codecov](https://codecov.io/gh/postmanlabs/uvm/branch/develop/graph/badge.svg)](https://codecov.io/gh/postmanlabs/uvm)

Module that exposes an event emitter to send data across contexts ([VM](https://nodejs.org/api/vm.html) in Node.js and [Web Workers](https://www.w3.org/TR/workers/) in browser).
Module that exposes an event emitter to send data across contexts ([Worker threads](https://nodejs.org/api/worker_threads.html) in Node.js and [Web Workers](https://www.w3.org/TR/workers/) in browser).

## Installation
UVM can be installed using NPM or directly from the git repository within your NodeJS projects. If installing from NPM, the following command installs the module and saves in your `package.json`
Expand Down
11 changes: 11 additions & 0 deletions firmware/sandbox-base.browser.js
@@ -0,0 +1,11 @@
module.exports = `
(function (self) {
var init = function (e) {
self.removeEventListener('message', init);
const __init_uvm = e && (e.__init_uvm || (e.data && e.data.__init_uvm));
// eslint-disable-next-line no-eval
(typeof __init_uvm === 'string') && eval(__init_uvm);
};
self.addEventListener('message', init);
}(self));
`;
11 changes: 5 additions & 6 deletions firmware/sandbox-base.js
@@ -1,10 +1,9 @@
module.exports = `
(function (self) {
var init = function (e) {
self.removeEventListener('message', init);
(function (parentPort) {
var init = function (m) {
// eslint-disable-next-line no-eval
(e && e.data && (typeof e.data.__init_uvm === 'string')) && eval(e.data.__init_uvm);
m && m.__init_uvm && (typeof m.__init_uvm === 'string') && eval(m.__init_uvm);
};
self.addEventListener('message', init);
}(self));
parentPort.once('message', init);
}(require('worker_threads').parentPort));
`;
2 changes: 1 addition & 1 deletion lib/bridge-client.js
Expand Up @@ -11,7 +11,7 @@
const toString = String.prototype.toString;

/**
* Generate code to be executed inside a VM for bootstrap.
* Generate code to be executed inside a worker for bootstrap.
*
* @param {String|Buffer} bootCode
* @return {String}
Expand Down
19 changes: 9 additions & 10 deletions lib/bridge.browser.js
@@ -1,9 +1,4 @@
/* istanbul ignore file */
/*
* @note options.dispatchTimeout is not implemented in browser sandbox because
* there is no way to interrupt an infinite loop.
* Maybe terminate and restart the worker or execute in nested worker.
*/
const Flatted = require('flatted'),
{ randomNumber } = require('./utils'),

Expand All @@ -20,12 +15,14 @@ const Flatted = require('flatted'),
* @private
* @param {String} code -
* @param {String} id -
* @param {Boolean} debug -
* @return {String}
*/
sandboxFirmware = (code, id) => {
sandboxFirmware = (code, id, debug) => {
// @note self.postMessage and self.addEventListener methods are cached
// in variable or closure because bootCode might mutate the global scope
return `
!${debug} && (console = new Proxy({}, { get: function () { return function () {}; } }));
__uvm_emit = function (postMessage, args) {
postMessage({__id_uvm: "${id}",__emit_uvm: args});
}.bind(null, self.postMessage);
Expand Down Expand Up @@ -72,7 +69,7 @@ module.exports = function (bridge, options, callback) {
},

// function to terminate worker
terminateWorker = function () {
terminateWorker = function (callback) {
if (!worker) { return; }

// remove event listeners for this sandbox
Expand All @@ -89,6 +86,7 @@ module.exports = function (bridge, options, callback) {
}

worker = null;
callback && callback();
};

// on load attach the dispatcher
Expand All @@ -112,12 +110,13 @@ module.exports = function (bridge, options, callback) {
});

// get firmware code string with boot code
firmwareCode = sandboxFirmware(bridgeClientCode(options.bootCode), id);
firmwareCode = sandboxFirmware(bridgeClientCode(options.bootCode), id, options.debug);

// start boot timer, stops once we get the load signal, terminate otherwise
bootTimer = setTimeout(() => {
terminateWorker();
callback(new Error(`uvm: boot timed out after ${options.bootTimeout}ms.`));
terminateWorker(() => {
callback(new Error(`uvm: boot timed out after ${options.bootTimeout}ms.`));
});
}, options.bootTimeout);

// if sandbox worker is provided, we simply need to init with firmware code
Expand Down
234 changes: 116 additions & 118 deletions lib/bridge.js
@@ -1,142 +1,140 @@
const vm = require('vm'),
Flatted = require('flatted'),
/* istanbul ignore file */

{ isString, randomNumber } = require('./utils'),

bridgeClientCode = require('./bridge-client'),
delegateTimers = require('./vm-delegate-timers'),
const Flatted = require('flatted'),
{ randomNumber } = require('./utils'),
{ Worker } = require('worker_threads'),

ERROR = 'error',
UVM_DATA_ = '__uvm_data_',
UVM_DISPATCH_ = '__uvm_dispatch_',
MESSAGE = 'message',
UVM_ID_ = '__id_uvm_',
MESSAGE_ERROR = 'messageerror',

// code for bridge
bridgeClientCode = require('./bridge-client'),

/**
* Convert array or arguments object to JSON
* Returns the firmware code to be executed inside Worker thread.
*
* @private
* @param {Array|Argument} arr
* @param {String} code -
* @param {String} id -
* @return {String}
*
* @note This has been held as reference to avoid being misused if modified in global context;
*/
jsonArray = (function (arrayProtoSlice, jsonStringify) {
return function (arr) {
return jsonStringify(arrayProtoSlice.call(arr));
};
}(Array.prototype.slice, Flatted.stringify)),
sandboxFirmware = (code, id) => {
return `
__parentPort = require('worker_threads').parentPort;
__uvm_emit = function (parentPort, args) {
parentPort.postMessage({__id_uvm: "${id}",__emit_uvm: args});
}.bind(null, __parentPort);
__uvm_addEventListener = __parentPort.on.bind(__parentPort);
__parentPort = null; delete __parentPort;
try {${code}} catch (e) { setTimeout(function () { throw e; }, 0); }
(function (emit, id) {
__uvm_addEventListener("message", function (m) {
(m && (typeof m.__emit_uvm === 'string') && (m.__id_uvm === id)) &&
emit(m.__emit_uvm);
});
}(__uvm_dispatch, "${id}"));
__uvm_emit('${Flatted.stringify(['load.' + id])}');
__uvm_dispatch = null; __uvm_emit = null; __uvm_addEventListener = null;
delete __uvm_dispatch; delete __uvm_emit; delete __uvm_addEventListener;
`;
};

/**
* @private
* @param {String} str
* @return {Array}
*/
unJsonArray = (function (jsonParse) {
return function (str) {
return jsonParse(str);
module.exports = function (bridge, options, callback) {
let worker,
bootTimer,
firmwareCode;

const id = UVM_ID_ + randomNumber(),

// function to forward messages emitted
forwardEmits = (m) => {
if (!(m && (typeof m.__emit_uvm === 'string') && (m.__id_uvm === id))) { return; }

let args;

try { args = Flatted.parse(m.__emit_uvm); }
catch (err) { return bridge.emit(ERROR, err); }
bridge.emit(...args);
},

// function to forward errors emitted
forwardErrors = (e) => {
bridge.emit(ERROR, e);
},

// function to terminate worker
terminateWorker = function (callback) {
if (!worker) { return; }

// remove event listeners for this sandbox
worker.off(MESSAGE, forwardEmits);
worker.off(ERROR, forwardErrors);
worker.off(MESSAGE_ERROR, forwardErrors);

if (!options._sandbox) {
worker.terminate()
.then((exitCode) => { callback && callback(exitCode); });
}

worker = null;
};
}(Flatted.parse));

/**
* This function equips an event emitter with communication capability with a VM.
*
* @param {EventEmitter} emitter -
* @param {Object} options -
* @param {String} options.bootCode -
* @param {vm~Context=} [options._sandbox] -
* @param {Function} callback -
*/
module.exports = function (emitter, options, callback) {
let code = bridgeClientCode(options.bootCode),
context = options._sandbox || vm.createContext(Object.create(null)),
bridgeDispatch;

// inject console on debug mode
options.debug && (context.console = console);

// we need to inject the timers inside vm since VM does not have timers
if (!options._sandbox) {
delegateTimers(context);
}

try {
// inject the emitter via context. it will be referenced by the bridge and then deleted to prevent
// additional access
context.__uvm_emit = function (args) {
/* istanbul ignore if */
if (!isString(args)) { return; }
// on load attach the dispatcher
bridge.once('load.' + id, () => {
// stop boot timer first
clearTimeout(bootTimer);

try { args = unJsonArray(args); }
catch (err) { /* istanbul ignore next */ emitter.emit(ERROR, err); }
bridge._dispatch = function () {
if (!worker) {
return bridge.emit(ERROR,
new Error('uvm: unable to dispatch "' + arguments[0] + '" post disconnection.'));
}

emitter.emit(...args);
worker.postMessage({
__emit_uvm: Flatted.stringify(Array.prototype.slice.call(arguments)),
__id_uvm: id
});
};

vm.runInContext(code, context, {
timeout: options.bootTimeout
callback(null, bridge);
});

// get firmware code string with boot code
firmwareCode = sandboxFirmware(bridgeClientCode(options.bootCode), id);

// start boot timer, stops once we get the load signal, terminate otherwise
bootTimer = setTimeout(() => {
terminateWorker(() => {
callback(new Error(`uvm: boot timed out after ${options.bootTimeout}ms.`));
});
}, options.bootTimeout);

// we keep a reference to the dispatcher so that we can preemptively re inject it in case it is deleted
// by user scripts
bridgeDispatch = context.__uvm_dispatch;
// if sandbox worker is provided, we simply need to init with firmware code
// @todo validate sandbox type or APIs
if (options._sandbox) {
worker = options._sandbox;
worker.postMessage({ __init_uvm: firmwareCode });
}
catch (err) {
return callback(err);
}
finally { // set all raw interface methods to null (except the dispatcher since we need it later)
vm.runInContext(`
__uvm_emit = null; delete __uvm_emit; __uvm_dispatch = null; delete __uvm_dispatch;
`, context);
delete context.__uvm_emit;
delete context.__uvm_dispatch;
// else, spawn a new worker
else {
worker = new Worker(firmwareCode, {
eval: true,
stdout: !options.debug,
stderr: !options.debug
});
}

// since context is created and emitter is bound, we would now attach the send function
emitter._dispatch = function () {
const id = UVM_DATA_ + randomNumber(),
dispatchId = UVM_DISPATCH_ + id;

// trigger event if any dispatch happens post disconnection
if (!context) {
return this.emit(ERROR, new Error(`uvm: unable to dispatch "${arguments[0]}" post disconnection.`));
}

try {
// save the data in context. by this method, we avoid needless string and character encoding or escaping
// issues. this is slightly prone to race condition issues, but using random numbers we intend to solve it
context[id] = jsonArray(arguments);
context[dispatchId] = bridgeDispatch;

// restore the dispatcher for immediate use!
vm.runInContext(`
(function (dispatch, data) {
${id} = null; (delete ${id});
${dispatchId} = null; (delete ${dispatchId});
dispatch(String(data));
}(${dispatchId}, ${id}));
`, context, {
timeout: options.dispatchTimeout
});
}
// swallow errors since other platforms will not trigger error if execution fails
catch (e) { this.emit(ERROR, e); }
finally { // precautionary delete
if (context) {
delete context[id];
delete context[dispatchId];
}
}
};

emitter._disconnect = function () {
/* istanbul ignore if */
if (!context) { return; }
// add event listener for receiving events
// from worker (is removed on disconnect)
worker.on(MESSAGE, forwardEmits);
worker.on(ERROR, forwardErrors);
worker.on(MESSAGE_ERROR, forwardErrors);

// clear only if the context was created inside this function
!options._sandbox && Object.keys(context).forEach((prop) => {
delete context[prop];
});
context = null;
};
// equip bridge to disconnect (i.e. terminate the worker)
bridge._disconnect = terminateWorker;

callback(null, emitter);
// help GC collect large variables
firmwareCode = null;
};

0 comments on commit 56e8dae

Please sign in to comment.