Skip to content

Commit

Permalink
worker: make MessagePort inherit from EventTarget
Browse files Browse the repository at this point in the history
Use `NodeEventTarget` to provide a mixed `EventEmitter`/`EventTarget`
API interface.

PR-URL: #34057
Refs: https://twitter.com/addaleax/status/1276289101671608320
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: David Carlier <devnexen@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
addaleax authored and ruyadorno committed Jul 29, 2020
1 parent c93a898 commit 0aa3809
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 69 deletions.
17 changes: 15 additions & 2 deletions benchmark/worker/messageport.js
Expand Up @@ -4,6 +4,7 @@ const common = require('../common.js');
const { MessageChannel } = require('worker_threads');
const bench = common.createBenchmark(main, {
payload: ['string', 'object'],
style: ['eventtarget', 'eventemitter'],
n: [1e6]
});

Expand All @@ -25,14 +26,26 @@ function main(conf) {
const { port1, port2 } = new MessageChannel();

let messages = 0;
port2.onmessage = () => {
function listener() {
if (messages++ === n) {
bench.end(n);
port1.close();
} else {
write();
}
};
}

switch (conf.style) {
case 'eventtarget':
port2.onmessage = listener;
break;
case 'eventemitter':
port2.on('message', listener);
break;
default:
throw new Error('Unsupported listener type');
}

bench.start();
write();

Expand Down
27 changes: 23 additions & 4 deletions lib/internal/per_context/messageport.js
@@ -1,12 +1,31 @@
'use strict';
const {
SymbolFor,
} = primordials;

class MessageEvent {
constructor(data, target) {
constructor(data, target, type) {
this.data = data;
this.target = target;
this.type = type;
}
}

exports.emitMessage = function(data) {
if (typeof this.onmessage === 'function')
this.onmessage(new MessageEvent(data, this));
const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');

exports.emitMessage = function(data, type) {
if (typeof this[kHybridDispatch] === 'function') {
this[kHybridDispatch](data, type, undefined);
return;
}

const event = new MessageEvent(data, this, type);
if (type === 'message') {
if (typeof this.onmessage === 'function')
this.onmessage(event);
} else {
// eslint-disable-next-line no-lonely-if
if (typeof this.onmessageerror === 'function')
this.onmessageerror(event);
}
};
111 changes: 65 additions & 46 deletions lib/internal/worker/io.js
Expand Up @@ -24,20 +24,23 @@ const {
stopMessagePort
} = internalBinding('messaging');
const {
threadId,
getEnvMessagePort
} = internalBinding('worker');

const { Readable, Writable } = require('stream');
const EventEmitter = require('events');
const {
Event,
NodeEventTarget,
defineEventHandler,
initNodeEventTarget,
kCreateEvent,
kNewListener,
kRemoveListener,
} = require('internal/event_target');
const { inspect } = require('internal/util/inspect');
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
debug = fn;
});

const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kName = Symbol('kName');
const kOnMessageListener = Symbol('kOnMessageListener');
const kPort = Symbol('kPort');
const kWaitingStreams = Symbol('kWaitingStreams');
const kWritableCallbacks = Symbol('kWritableCallbacks');
Expand All @@ -54,55 +57,47 @@ const messageTypes = {
};

// We have to mess with the MessagePort prototype a bit, so that a) we can make
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
// not provide methods that are not present in the Browser and not documented
// on our side (e.g. hasRef).
// Save a copy of the original set of methods as a shallow clone.
const MessagePortPrototype = ObjectCreate(
ObjectGetPrototypeOf(MessagePort.prototype),
ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
// Set up the new inheritance chain.
ObjectSetPrototypeOf(MessagePort, EventEmitter);
ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
// Copy methods that are inherited from HandleWrap, because
// changing the prototype of MessagePort.prototype implicitly removed them.
MessagePort.prototype.ref = MessagePortPrototype.ref;
MessagePort.prototype.unref = MessagePortPrototype.unref;

// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
debug(`[${threadId}] received message`, event);
// Emit the deserialized object to userland.
this.emit('message', event.data);
};

// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
ObjectDefineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() {
return this[kOnMessageListener];
},
set(value) {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
MessagePortPrototype.start.call(this);
} else {
this.unref();
stopMessagePort(this);
}
class MessageEvent extends Event {
constructor(data, target, type) {
super(type);
this.data = data;
}
});
}

ObjectDefineProperty(
MessagePort.prototype,
kCreateEvent,
{
value: function(data, type) {
return new MessageEvent(data, this, type);
},
configurable: false,
writable: false,
enumerable: false,
});

// This is called from inside the `MessagePort` constructor.
function oninit() {
initNodeEventTarget(this);
// TODO(addaleax): This should be on MessagePort.prototype, but
// defineEventHandler() does not support that.
defineEventHandler(this, 'message');
defineEventHandler(this, 'messageerror');
setupPortReferencing(this, this, 'message');
}

Expand All @@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
value: oninit
});

class MessagePortCloseEvent extends Event {
constructor() {
super('close');
}
}

// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
this.emit('close');
this.dispatchEvent(new MessagePortCloseEvent());
}

ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
Expand Down Expand Up @@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
eventEmitter.on('newListener', function(name) {
if (name === eventName) newListener(eventEmitter.listenerCount(name));
});
eventEmitter.on('removeListener', function(name) {
if (name === eventName) removeListener(eventEmitter.listenerCount(name));
});
const origNewListener = eventEmitter[kNewListener];
eventEmitter[kNewListener] = function(size, type, ...args) {
if (type === eventName) newListener(size - 1);
return origNewListener.call(this, size, type, ...args);
};
const origRemoveListener = eventEmitter[kRemoveListener];
eventEmitter[kRemoveListener] = function(size, type, ...args) {
if (type === eventName) removeListener(size);
return origRemoveListener.call(this, size, type, ...args);
};

function newListener(size) {
if (size === 0) {
port.ref();
MessagePortPrototype.start.call(port);
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
}

function removeListener(size) {
if (size === 0) {
stopMessagePort(port);
port.unref();
}
});
}
}


Expand Down
17 changes: 9 additions & 8 deletions src/node_messaging.cc
Expand Up @@ -747,6 +747,8 @@ void MessagePort::OnMessage() {

Local<Value> payload;
Local<Value> message_error;
Local<Value> argv[2];

{
// Catch any exceptions from parsing the message itself (not from
// emitting it) as 'messageeror' events.
Expand All @@ -765,16 +767,15 @@ void MessagePort::OnMessage() {
continue;
}

if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
argv[0] = payload;
argv[1] = env()->message_string();

if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
reschedule:
if (!message_error.IsEmpty()) {
// This should become a `messageerror` event in the sense of the
// EventTarget API at some point.
Local<Value> argv[] = {
env()->messageerror_string(),
message_error
};
USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
argv[0] = message_error;
argv[1] = env()->messageerror_string();
USE(MakeCallback(emit_message, arraysize(argv), argv));
}

// Re-schedule OnMessage() execution in case of failure.
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-bootstrap-modules.js
Expand Up @@ -99,6 +99,7 @@ if (!common.isMainThread) {
'NativeModule _stream_transform',
'NativeModule _stream_writable',
'NativeModule internal/error_serdes',
'NativeModule internal/event_target',
'NativeModule internal/process/worker_thread_only',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/destroy',
Expand All @@ -109,6 +110,7 @@ if (!common.isMainThread) {
'NativeModule internal/worker',
'NativeModule internal/worker/io',
'NativeModule stream',
'NativeModule util',
'NativeModule worker_threads',
].forEach(expectedModules.add.bind(expectedModules));
}
Expand Down
6 changes: 3 additions & 3 deletions test/parallel/test-crypto-key-objects-messageport.js
Expand Up @@ -75,9 +75,9 @@ for (const [key, repr] of keys) {

// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
// implements EventTarget fully and in a cross-context manner.
port2moved.emit = common.mustCall((name, err) => {
assert.strictEqual(name, 'messageerror');
assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
port2moved.onmessageerror = common.mustCall((event) => {
assert.strictEqual(event.data.code,
'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
});

port2moved.start();
Expand Down
Expand Up @@ -10,8 +10,9 @@ const { MessageChannel } = require('worker_threads');

async_hooks.createHook({
init: common.mustCall((id, type, triggerId, resource) => {
assert.strictEqual(util.inspect(resource),
'MessagePort { active: true, refed: false }');
assert.strictEqual(
util.inspect(resource),
'MessagePort [EventTarget] { active: true, refed: false }');
}, 2)
}).enable();

Expand Down
6 changes: 3 additions & 3 deletions test/parallel/test-worker-message-port-transfer-filehandle.js
Expand Up @@ -57,9 +57,9 @@ const { once } = require('events');
});
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
// implements EventTarget fully and in a cross-context manner.
port2moved.emit = common.mustCall((name, err) => {
assert.strictEqual(name, 'messageerror');
assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
port2moved.onmessageerror = common.mustCall((event) => {
assert.strictEqual(event.data.code,
'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
});
port2moved.start();

Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-worker-message-port.js
Expand Up @@ -154,7 +154,9 @@ const { MessageChannel, MessagePort } = require('worker_threads');
assert.deepStrictEqual(
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
[
'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start',
// TODO(addaleax): This should include onmessage (and eventually
// onmessageerror).
'close', 'constructor', 'postMessage', 'ref', 'start',
'unref'
]);
}

0 comments on commit 0aa3809

Please sign in to comment.