diff --git a/benchmark/worker/messageport.js b/benchmark/worker/messageport.js index 8e2ddae73ff3ab..2f0d6f0621e8c8 100644 --- a/benchmark/worker/messageport.js +++ b/benchmark/worker/messageport.js @@ -4,6 +4,7 @@ const common = require('../common.js'); const { MessageChannel } = require('worker_threads'); const bench = common.createBenchmark(main, { payload: ['string', 'object'], + style: ['eventtarget', 'eventemitter'], n: [1e6] }); @@ -25,14 +26,26 @@ function main(conf) { const { port1, port2 } = new MessageChannel(); let messages = 0; - port2.onmessage = () => { + function listener() { if (messages++ === n) { bench.end(n); port1.close(); } else { write(); } - }; + } + + switch (conf.style) { + case 'eventtarget': + port2.onmessage = listener; + break; + case 'eventemitter': + port2.on('message', listener); + break; + default: + throw new Error('Unsupported listener type'); + } + bench.start(); write(); diff --git a/lib/internal/per_context/messageport.js b/lib/internal/per_context/messageport.js index ee08a596122a51..43587319b040de 100644 --- a/lib/internal/per_context/messageport.js +++ b/lib/internal/per_context/messageport.js @@ -1,12 +1,31 @@ 'use strict'; +const { + SymbolFor, +} = primordials; + class MessageEvent { - constructor(data, target) { + constructor(data, target, type) { this.data = data; this.target = target; + this.type = type; } } -exports.emitMessage = function(data) { - if (typeof this.onmessage === 'function') - this.onmessage(new MessageEvent(data, this)); +const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch'); + +exports.emitMessage = function(data, type) { + if (typeof this[kHybridDispatch] === 'function') { + this[kHybridDispatch](data, type, undefined); + return; + } + + const event = new MessageEvent(data, this, type); + if (type === 'message') { + if (typeof this.onmessage === 'function') + this.onmessage(event); + } else { + // eslint-disable-next-line no-lonely-if + if (typeof this.onmessageerror === 'function') + this.onmessageerror(event); + } }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 60dd8cd67d6186..5b5118a1d70b21 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -24,20 +24,23 @@ const { stopMessagePort } = internalBinding('messaging'); const { - threadId, getEnvMessagePort } = internalBinding('worker'); const { Readable, Writable } = require('stream'); -const EventEmitter = require('events'); +const { + Event, + NodeEventTarget, + defineEventHandler, + initNodeEventTarget, + kCreateEvent, + kNewListener, + kRemoveListener, +} = require('internal/event_target'); const { inspect } = require('internal/util/inspect'); -let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { - debug = fn; -}); const kIncrementsPortRef = Symbol('kIncrementsPortRef'); const kName = Symbol('kName'); -const kOnMessageListener = Symbol('kOnMessageListener'); const kPort = Symbol('kPort'); const kWaitingStreams = Symbol('kWaitingStreams'); const kWritableCallbacks = Symbol('kWritableCallbacks'); @@ -54,7 +57,7 @@ const messageTypes = { }; // We have to mess with the MessagePort prototype a bit, so that a) we can make -// it inherit from EventEmitter, even though it is a C++ class, and b) we do +// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do // not provide methods that are not present in the Browser and not documented // on our side (e.g. hasRef). // Save a copy of the original set of methods as a shallow clone. @@ -62,47 +65,39 @@ const MessagePortPrototype = ObjectCreate( ObjectGetPrototypeOf(MessagePort.prototype), ObjectGetOwnPropertyDescriptors(MessagePort.prototype)); // Set up the new inheritance chain. -ObjectSetPrototypeOf(MessagePort, EventEmitter); -ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype); +ObjectSetPrototypeOf(MessagePort, NodeEventTarget); +ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype); // Copy methods that are inherited from HandleWrap, because // changing the prototype of MessagePort.prototype implicitly removed them. MessagePort.prototype.ref = MessagePortPrototype.ref; MessagePort.prototype.unref = MessagePortPrototype.unref; -// A communication channel consisting of a handle (that wraps around an -// uv_async_t) which can receive information from other threads and emits -// .onmessage events, and a function used for sending data to a MessagePort -// in some other thread. -MessagePort.prototype[kOnMessageListener] = function onmessage(event) { - if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA) - debug(`[${threadId}] received message`, event); - // Emit the deserialized object to userland. - this.emit('message', event.data); -}; - -// This is for compatibility with the Web's MessagePort API. It makes sense to -// provide it as an `EventEmitter` in Node.js, but if somebody overrides -// `onmessage`, we'll switch over to the Web API model. -ObjectDefineProperty(MessagePort.prototype, 'onmessage', { - enumerable: true, - configurable: true, - get() { - return this[kOnMessageListener]; - }, - set(value) { - this[kOnMessageListener] = value; - if (typeof value === 'function') { - this.ref(); - MessagePortPrototype.start.call(this); - } else { - this.unref(); - stopMessagePort(this); - } +class MessageEvent extends Event { + constructor(data, target, type) { + super(type); + this.data = data; } -}); +} + +ObjectDefineProperty( + MessagePort.prototype, + kCreateEvent, + { + value: function(data, type) { + return new MessageEvent(data, this, type); + }, + configurable: false, + writable: false, + enumerable: false, + }); // This is called from inside the `MessagePort` constructor. function oninit() { + initNodeEventTarget(this); + // TODO(addaleax): This should be on MessagePort.prototype, but + // defineEventHandler() does not support that. + defineEventHandler(this, 'message'); + defineEventHandler(this, 'messageerror'); setupPortReferencing(this, this, 'message'); } @@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, { value: oninit }); +class MessagePortCloseEvent extends Event { + constructor() { + super('close'); + } +} + // This is called after the underlying `uv_async_t` has been closed. function onclose() { - this.emit('close'); + this.dispatchEvent(new MessagePortCloseEvent()); } ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, { @@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) { // If there are none or all are removed, unref() the channel so the worker // can shutdown gracefully. port.unref(); - eventEmitter.on('newListener', (name) => { - if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + eventEmitter.on('newListener', function(name) { + if (name === eventName) newListener(eventEmitter.listenerCount(name)); + }); + eventEmitter.on('removeListener', function(name) { + if (name === eventName) removeListener(eventEmitter.listenerCount(name)); + }); + const origNewListener = eventEmitter[kNewListener]; + eventEmitter[kNewListener] = function(size, type, ...args) { + if (type === eventName) newListener(size - 1); + return origNewListener.call(this, size, type, ...args); + }; + const origRemoveListener = eventEmitter[kRemoveListener]; + eventEmitter[kRemoveListener] = function(size, type, ...args) { + if (type === eventName) removeListener(size); + return origRemoveListener.call(this, size, type, ...args); + }; + + function newListener(size) { + if (size === 0) { port.ref(); MessagePortPrototype.start.call(port); } - }); - eventEmitter.on('removeListener', (name) => { - if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + } + + function removeListener(size) { + if (size === 0) { stopMessagePort(port); port.unref(); } - }); + } } diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 91927ebbd9ebb1..55f7b0b2cbfeca 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -747,6 +747,8 @@ void MessagePort::OnMessage() { Local payload; Local message_error; + Local argv[2]; + { // Catch any exceptions from parsing the message itself (not from // emitting it) as 'messageeror' events. @@ -765,16 +767,15 @@ void MessagePort::OnMessage() { continue; } - if (MakeCallback(emit_message, 1, &payload).IsEmpty()) { + argv[0] = payload; + argv[1] = env()->message_string(); + + if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) { reschedule: if (!message_error.IsEmpty()) { - // This should become a `messageerror` event in the sense of the - // EventTarget API at some point. - Local argv[] = { - env()->messageerror_string(), - message_error - }; - USE(MakeCallback(env()->emit_string(), arraysize(argv), argv)); + argv[0] = message_error; + argv[1] = env()->messageerror_string(); + USE(MakeCallback(emit_message, arraysize(argv), argv)); } // Re-schedule OnMessage() execution in case of failure. diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 33c421c2f13fb6..f7f1d2583d928c 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -99,6 +99,7 @@ if (!common.isMainThread) { 'NativeModule _stream_transform', 'NativeModule _stream_writable', 'NativeModule internal/error_serdes', + 'NativeModule internal/event_target', 'NativeModule internal/process/worker_thread_only', 'NativeModule internal/streams/buffer_list', 'NativeModule internal/streams/destroy', @@ -109,6 +110,7 @@ if (!common.isMainThread) { 'NativeModule internal/worker', 'NativeModule internal/worker/io', 'NativeModule stream', + 'NativeModule util', 'NativeModule worker_threads', ].forEach(expectedModules.add.bind(expectedModules)); } diff --git a/test/parallel/test-crypto-key-objects-messageport.js b/test/parallel/test-crypto-key-objects-messageport.js index 1b910b571f77fc..f38e20da420aae 100644 --- a/test/parallel/test-crypto-key-objects-messageport.js +++ b/test/parallel/test-crypto-key-objects-messageport.js @@ -75,9 +75,9 @@ for (const [key, repr] of keys) { // TODO(addaleax): Switch this to a 'messageerror' event once MessagePort // implements EventTarget fully and in a cross-context manner. - port2moved.emit = common.mustCall((name, err) => { - assert.strictEqual(name, 'messageerror'); - assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); + port2moved.onmessageerror = common.mustCall((event) => { + assert.strictEqual(event.data.code, + 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); }); port2moved.start(); diff --git a/test/parallel/test-worker-message-port-inspect-during-init-hook.js b/test/parallel/test-worker-message-port-inspect-during-init-hook.js index 30b90710a604f1..8f9678de1e970e 100644 --- a/test/parallel/test-worker-message-port-inspect-during-init-hook.js +++ b/test/parallel/test-worker-message-port-inspect-during-init-hook.js @@ -10,8 +10,9 @@ const { MessageChannel } = require('worker_threads'); async_hooks.createHook({ init: common.mustCall((id, type, triggerId, resource) => { - assert.strictEqual(util.inspect(resource), - 'MessagePort { active: true, refed: false }'); + assert.strictEqual( + util.inspect(resource), + 'MessagePort [EventTarget] { active: true, refed: false }'); }, 2) }).enable(); diff --git a/test/parallel/test-worker-message-port-transfer-filehandle.js b/test/parallel/test-worker-message-port-transfer-filehandle.js index 157acb22e8ad55..c42d2e8b4042cf 100644 --- a/test/parallel/test-worker-message-port-transfer-filehandle.js +++ b/test/parallel/test-worker-message-port-transfer-filehandle.js @@ -57,9 +57,9 @@ const { once } = require('events'); }); // TODO(addaleax): Switch this to a 'messageerror' event once MessagePort // implements EventTarget fully and in a cross-context manner. - port2moved.emit = common.mustCall((name, err) => { - assert.strictEqual(name, 'messageerror'); - assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); + port2moved.onmessageerror = common.mustCall((event) => { + assert.strictEqual(event.data.code, + 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); }); port2moved.start(); diff --git a/test/parallel/test-worker-message-port.js b/test/parallel/test-worker-message-port.js index d128dc7edb25fd..4f4863c45ed516 100644 --- a/test/parallel/test-worker-message-port.js +++ b/test/parallel/test-worker-message-port.js @@ -154,7 +154,9 @@ const { MessageChannel, MessagePort } = require('worker_threads'); assert.deepStrictEqual( Object.getOwnPropertyNames(MessagePort.prototype).sort(), [ - 'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start', + // TODO(addaleax): This should include onmessage (and eventually + // onmessageerror). + 'close', 'constructor', 'postMessage', 'ref', 'start', 'unref' ]); }